package com.vaye.im.extension;

import lombok.extern.slf4j.Slf4j;
import org.jgroups.*;
import org.jgroups.util.Util;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.locks.ReentrantLock;

/**
 * 节点
 *  <pre>
 *  tcp模式下:
 *  如果是同一台机器测试,请注意在
 *  TCPPING 元素下修改 initial_hosts的配置端口:
 *  例如:"${jgroups.tcpping.initial_hosts:192.168.19.100[7800],192.168.19.100[7801]}
 *  如果是多台机器测试,请注意在
 *  TCPPING 元素下修改 initial_hosts的ip,端口随意:
 *  例如:"${jgroups.tcpping.initial_hosts:192.168.19.100[7800],192.168.19.178[7800]}
 *
 *  udp模式下:
 *  同一台机器的不同端口(端口是动态的)可通信.
 *  不同机器之间的ip多播可能会受到一些因素限制而造成节点之间无法彼此发现.
 *  </pre>
 * @author wangzhiyong
 * @module admin
 * @date 2023年07月12日 下午3:38
 */
@Slf4j
@Component
public class Node extends ReceiverAdapter {

    //集群名称
    private static final String CLUSTER_NAME = "ChatCluster";

    //配置文件
    private static final String CONFIG_XML = "network-udp.xml";

    //节点通道
    private JChannel jChannel;

    private ReentrantLock lock = new ReentrantLock();

    //存放消息记录
    private final List<String> state = new LinkedList<>();

    /**
     * 启动节点
     */
//    @PostConstruct
    public void init(){
        log.info("开始启动节点");
        InputStream is = this.getClass().getClassLoader().getResourceAsStream(CONFIG_XML);
        try {
//            jChannel = new JChannel(is);
            //默认配置文件
            jChannel = new JChannel();
            jChannel.setReceiver(this);
            jChannel.connect(CLUSTER_NAME);
            jChannel.getState(null,50000);
            log.info("启动节点启动成功");
        } catch (Exception e) {
            log.error("启动节点异常!", e);
            e.printStackTrace();
        }
    }

    /**
     * 发送消息
     * @author wangzhiyong
     * @date 2023/7/12 下午3:52
     * @param address 目标地址
     * @param msgContent 消息内容
     */
    public void senMsg(Address address,Object msgContent){
        Message msg = new Message(address,msgContent);
        try {
            jChannel.send(msg);
        } catch (Exception e) {
            log.error("消息发送失败");
            e.printStackTrace();
        }
    }

    /**
     * 获取状态（该方法在状态提供的程序中被调用）
     * @author wangzhiyong
     * @date 2023/7/12 下午4:02
     * @param output
     */
    @Override
    public void getState(OutputStream output) {
        lock.lock();
        try {
            Util.objectToStream(state,new DataOutputStream(output));
        } catch (Exception e) {
            log.error("获取状态失败");
        }finally {
            lock.unlock();
        }
    }

    /**
     * 设置状态(该方法在程序请求程序中被调用)
     * @author wangzhiyong
     * @date 2023/7/12 下午4:10
     * @param input 输入流
     */
    @Override
    public void setState(InputStream input) throws Exception {
        lock.lock();
        try {
            List<String> list;
            list = Util.objectFromStream(new DataInputStream(input));
            synchronized (state) {
                state.clear();
                state.addAll(list);
            }
            System.out.println(list.size() + " messages in chat history):");
            list.forEach(System.out::println);
        } catch (Exception e) {
            log.error("从主节点同步状态到当前节点发生异常!{}",e);
            e.printStackTrace();
        }finally {
            lock.unlock();
        }
    }

    /**
     * 接收消息
     * @author wangzhiyong
     * @date 2023/7/12 下午4:04
     * @param msg
     */
    @Override
    public void receive(Message msg) {
        if (jChannel.getAddress().equals(msg.getSrc())) {   //忽略自己发送的消息
            return;
        }
        String format = String.format("%s:%s", msg.getSrc(), msg.getObject());
        log.info(format);
        state.add(format);
    }

    /**
     * 视图接收（该方法主要作用：监听集群中成员变化，eg：新实例加入集群、新实例离开集群【包含崩溃情况】）
     * 它的toString()方法打印视图ID(一个递增的ID)和集群中当前实例的列表
     * @author wangzhiyong
     * @date 2023/7/12 下午4:18
     * @param view
     */
    @Override
    public void viewAccepted(View view) {
        log.info("成员变更通知：viewInfo={}",view.toString());
    }
}
